package com.danan.data_collector.initializer;

import com.danan.data_collector.config.EnvironmentConfig;
import com.danan.data_collector.config.SourceConfig;
import com.danan.data_collector.util.MyCdcDeserialization;
import com.danan.data_collector.util.MyOracleCdcDeserialization;
import com.ververica.cdc.connectors.mysql.source.MySqlSource;
import com.ververica.cdc.connectors.mysql.table.StartupOptions;
import com.ververica.cdc.connectors.oracle.OracleSource;
import com.ververica.cdc.connectors.sqlserver.SqlServerSource;
import com.ververica.cdc.debezium.DebeziumSourceFunction;
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

import javax.annotation.Resource;
import java.util.Arrays;
import java.util.Properties;
import java.util.stream.Collectors;

/**
 * Created with IntelliJ IDEA.
 *
 * @Author: NanHuang
 * @Date: 2023/05/13/11:42
 * @Description:
 */
@Configuration
public class DataStreamInitializer {
    @Resource
    private StreamExecutionEnvironment env;
    @Resource
    private SourceConfig sourceConfig;

    private String startupModel;

    @Bean
    public DataStreamSource<String> dataStreamSource(@Autowired EnvironmentConfig environmentConfig) {
        this.startupModel = environmentConfig.getCdcModel();
        switch (sourceConfig.getType().toLowerCase()) {
            case "mysql":
                return getMySQLDataStream();
            case "oracle":
                return getOracleDataStream();
            case "sqlserver":
                return getSqlServerDataStream();
            default:
                throw new RuntimeException("The data source is not defined. Please select a data source from the list [ mysql , oracle , sqlserver ]");
        }
    }

    /**
     * @Description 获取mysql数据流
     * @Param []
     **/
    private DataStreamSource<String> getMySQLDataStream() {
        MySqlSource<String> mySqlSource = null;
        switch (startupModel) {
            case "initial":
                mySqlSource = MySqlSource.<String>builder()
                        .hostname(sourceConfig.getHostname())
                        .port(sourceConfig.getPort())
                        .databaseList(sourceConfig.getDatabase())
                        .tableList(Arrays.stream(sourceConfig.getSchemaList()).map(s -> s + ".*").collect(Collectors.joining(",")))
                        .username(sourceConfig.getUsername())
                        .password(sourceConfig.getPassword())
                        .startupOptions(com.ververica.cdc.connectors.mysql.table.StartupOptions.earliest())
                        .deserializer(new MyCdcDeserialization())
                        .build();
                break;
            case "latest":
                mySqlSource = MySqlSource.<String>builder()
                        .hostname(sourceConfig.getHostname())
                        .port(sourceConfig.getPort())
                        .databaseList(sourceConfig.getDatabase())
                        .tableList(Arrays.stream(sourceConfig.getSchemaList()).map(s -> s + ".*").collect(Collectors.joining(",")))
                        .username(sourceConfig.getUsername())
                        .password(sourceConfig.getPassword())
                        .startupOptions(StartupOptions.latest())
                        .deserializer(new MyCdcDeserialization())
                        .build();
                break;
            default:
                throw new RuntimeException("The cdc model is not defined. Please select a cdc model from the list [ initial , latest ]");
        }
        return env.fromSource(mySqlSource, WatermarkStrategy.noWatermarks(), "mysql_cdc");
    }

    /**
     * @Description 获取oracle数据流
     * @Param []
     **/
    private DataStreamSource<String> getOracleDataStream() {
        String url = String.format(
                "jdbc:oracle:thin:@//%s:%s/%s",
                sourceConfig.getHostname(),
                sourceConfig.getPort(),
                sourceConfig.getDatabase()
        );

        Properties properties = new Properties();
//        properties.setProperty("log.mining.strategy", "online_catalog");
//        properties.setProperty("log.mining.continuous.mine", "true");

//        开启Blob和Clob类型数据采集，否则结果为null
        properties.setProperty("lob.enabled", "true");

//        使用服务名连接Oracle，必须加上此参数
        properties.setProperty("database.url", url);

//        CDB+PDB模式必须设置database.dbname和database.pdb.name两个属性:
//              1 database.dbname是CDB的名字
//              2 database.pdb.name是PDB的名字
        properties.setProperty("database.dbname", "orcl");
        properties.setProperty("database.pdb.name", sourceConfig.getDatabase());

        DebeziumSourceFunction<String> oracleSource = null;

        switch (startupModel) {
            case "initial":
                oracleSource = OracleSource.<String>builder()
                        .hostname(sourceConfig.getHostname())
                        .port(sourceConfig.getPort())
                        .database(sourceConfig.getDatabase())
                        .tableList(Arrays.stream(sourceConfig.getSchemaList()).map(s -> s + ".*").collect(Collectors.joining(",")))
                        .username(sourceConfig.getUsername())
                        .password(sourceConfig.getPassword())
                        .deserializer(new MyOracleCdcDeserialization())
                        .debeziumProperties(properties)
                        .build();
                break;
            case "latest":
                oracleSource = OracleSource.<String>builder()
                        .hostname(sourceConfig.getHostname())
                        .port(sourceConfig.getPort())
                        .database(sourceConfig.getDatabase())
                        .tableList(Arrays.stream(sourceConfig.getSchemaList()).map(s -> s + ".*").collect(Collectors.joining(",")))
                        .username(sourceConfig.getUsername())
                        .password(sourceConfig.getPassword())
                        .startupOptions(com.ververica.cdc.connectors.base.options.StartupOptions.latest())
                        .deserializer(new MyOracleCdcDeserialization())
                        .debeziumProperties(properties)
                        .build();
                break;
            default:
                throw new RuntimeException("The cdc model is not defined. Please select a cdc model from the list [ initial , latest ]");
        }
        return env.addSource(oracleSource);
    }

    /**
     * @Description 获取sqlserver数据流
     * @Param []
     **/
    private DataStreamSource<String> getSqlServerDataStream() {
        DebeziumSourceFunction<String> sqlserverSource = null;
        switch (startupModel) {
            case "initial":
                sqlserverSource = SqlServerSource.<String>builder()
                        .hostname(sourceConfig.getHostname())
                        .port(sourceConfig.getPort())
                        .database(sourceConfig.getDatabase()) // monitor sqlserver database
                        .tableList(Arrays.stream(sourceConfig.getSchemaList()).map(s -> s + ".*").collect(Collectors.joining(","))) // monitor products table
                        .username(sourceConfig.getUsername())
                        .password(sourceConfig.getPassword())
                        .deserializer(new MyOracleCdcDeserialization()) // converts SourceRecord to JSON String
                        .build();
                break;
            case "latest":
                sqlserverSource = SqlServerSource.<String>builder()
                        .hostname(sourceConfig.getHostname())
                        .port(sourceConfig.getPort())
                        .database(sourceConfig.getDatabase()) // monitor sqlserver database
                        .tableList(Arrays.stream(sourceConfig.getSchemaList()).map(s -> s + ".*").collect(Collectors.joining(","))) // monitor products table
                        .username(sourceConfig.getUsername())
                        .password(sourceConfig.getPassword())
                        .startupOptions(com.ververica.cdc.connectors.sqlserver.table.StartupOptions.latest())
                        .deserializer(new MyOracleCdcDeserialization()) // converts SourceRecord to JSON String
                        .build();
                break;
            default:
                throw new RuntimeException("The cdc model is not defined. Please select a cdc model from the list [ initial , latest ]");
        }
        return env.addSource(sqlserverSource);
    }

}
